Add source-layer shuffle to iceberg-source for correct and scalable C…#6682
Add source-layer shuffle to iceberg-source for correct and scalable C…#6682lawofcycles wants to merge 9 commits intoopensearch-project:mainfrom
Conversation
|
@dlvenable I would appreciate your review on this PR. This implements the source-layer shuffle discussed in #6554 (comment), addressing both the correctness bug (cross-partition UPDATE data loss) and the scalability limitation (bounds-based pairing fallback to single-node processing) described in #6666. Once this is merged, I plan to update the CDC RFC (#6552) to reflect the shuffle design. |
|
I have opened a documentation PR for Iceberg source plugin: opensearch-project/documentation-website#12164. It reflects the current state of the implementation and would be useful as a reference during review. |
|
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up. If authentication should be included in the initial implementation, please let me know. |
|
Fixed a bug where |
|
Fixed another issue: Previously, |
|
During multi-node performance testing, I found a race condition where |
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
2d1d6f1 to
bd4e2c5
Compare
|
I addressed bugs and improvement points that I found during the e2e performance test. |
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @lawofcycles for this improvement!
| * Standalone Armeria HTTP server for serving shuffle data. | ||
| * Runs independently from PeerForwarder to avoid core dependencies. | ||
| */ | ||
| public class ShuffleHttpServer { |
There was a problem hiding this comment.
We should use the common HTTP server code. This is important for providing consistent experience with ports that Data Prepper may open. You can see how this is used in the http source for example.
There was a problem hiding this comment.
Migrated to use CreateServer from http-common. Added a lightweight createHTTPServer overload that takes a CertificateProvider and optional ArmeriaHttpAuthenticationProvider, without the buffer, throttling, and health check dependencies. ShuffleConfig now extends BaseHttpServerConfig so it inherits the standard TLS configuration (file, S3, ACM) via CertificateProviderFactory.
| @JsonProperty("target_partition_size") | ||
| private ByteCount targetPartitionSize = ByteCount.parse(DEFAULT_TARGET_PARTITION_SIZE); | ||
|
|
||
| @JsonProperty("server_port") |
There was a problem hiding this comment.
The approach of using S3 versus local disk should probably be handled via a plugin. This would allow us to extend it more. This is how we support different source coordination mechanisms or different authentication mechanisms in plugins like the http source.
Since this source is @Experimental we can break the config between versions so we don't need to do this now.
There was a problem hiding this comment.
Acknowledged. The current ShuffleStorage interface already abstracts the storage backend, so extending it to a plugin model in a future version should be straightforward.
| private static final Logger LOG = LoggerFactory.getLogger(ShuffleNodeClient.class); | ||
| private static final int MAX_RETRIES = 3; | ||
|
|
||
| private final HttpClient httpClient; |
There was a problem hiding this comment.
Let's use Armeria's web client for web requests.
There was a problem hiding this comment.
Replaced java.net.http.HttpClient with Armeria WebClient. TLS verification bypass now uses ClientFactory.builder().tlsNoVerify() instead of a custom TrustAllCerts implementation.
…DC processing Introduce a pull-based shuffle mechanism for processing snapshots that contain DELETE operations (UPDATE/DELETE in Copy-on-Write tables). When a snapshot contains DeletedDataFileScanTasks, records are shuffled by identifier_columns hash across nodes so that carryover removal and UPDATE merge operate on complete data, including cross-partition updates. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
… to shuffle server Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…lient utility Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…te before partitions Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…eader for correct type handling The shuffle record serialization used GenericDatumWriter/GenericDatumReader which only handle Avro native types. Iceberg Records contain Java types like OffsetDateTime for timestamptz columns, causing AvroRuntimeException during SHUFFLE_WRITE. Replace with Iceberg's DataWriter and PlannedDataReader which handle the Iceberg-to-Avro type conversion internally. Extract serialization logic into RecordAvroSerializer utility class with roundtrip tests covering temporal types. Also fix shuffle write completion key race condition by creating GlobalState before partitions in processShuffleSnapshot, matching the order used in processInsertOnlySnapshot. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Migrate ShuffleHttpServer to use CreateServer from http-common and ShuffleNodeClient to use Armeria WebClient. ShuffleConfig now extends BaseHttpServerConfig for consistent TLS configuration including ACM and S3 certificate support. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
bd4e2c5 to
84858c6
Compare
| @Override | ||
| public void addRecord(final int partitionNumber, final byte operation, final int changeOrdinal, | ||
| final byte[] serializedRecord) { | ||
| buffer.add(new BufferedRecord(partitionNumber, operation, changeOrdinal, serializedRecord)); |
There was a problem hiding this comment.
The writer buffers all records in memory before sorting. With multiple concurrent SHUFFLE_WRITE tasks on the same node, combined heap usage could be significant. Consider adding a configurable memory limit that spills to disk when exceeded.
There was a problem hiding this comment.
Thank you for the review. In the current implementation, each node runs a single ChangelogWorker thread that processes tasks sequentially via acquireAvailablePartition, so multiple SHUFFLE_WRITE tasks do not run concurrently on the same node. The memory usage per task is bounded by one Iceberg data file (default 512MB).
Parallelizing worker threads is something I am planning to work on for performance improvement. When implementing that, adding a configurable memory limit with spill to disk will be necessary to keep the combined heap usage under control. I will create a follow up issue to track both together.
There was a problem hiding this comment.
Thanks for the clarification and for tracking it.
Performance test resultsTested with an Iceberg table (NYC Yellow Taxi 2024, 41 million rows, 19 columns, partitioned by day) on ECS Fargate (2 vCPU / 16 GiB per node, ARM64). Source coordination backed by DynamoDB (provisioned 5,000 RCU / 5,000 WCU). OpenSearch cluster: 6 x r7g.4xlarge. Measurements represent the iceberg-source plugin's processing time from snapshot detection to pipeline buffer submission ( UPDATE/DELETE operations target randomly sampled rows, so the affected rows are scattered across many data files. Due to Copy on Write, each affected data file is fully rewritten even if only one row changes, so the shuffle phase reads all rows in the rewritten files.
INSERT operations complete in under 10 seconds regardless of data volume (100K vs 1M rows), as the bottleneck is task coordination overhead rather than data reading. Small UPDATE/DELETE (50K rows) with shuffle completes in 21 to 37 seconds, scaling well with node count. UPDATE/DELETE is significantly slower than INSERT even for smaller row counts (50K UPDATE: 28 to 35s vs 100K INSERT: 5 to 8s). This is because Iceberg's Copy on Write rewrites entire data files for each affected row. The shuffle phase must read all rows in the rewritten files, not just the changed rows. A 50K row UPDATE scattered across many data files results in reading millions of rows. Large UPDATE/DELETE (500K+ rows) creates thousands of SHUFFLE_WRITE tasks (one per data file). Source coordination stores all tasks under the same DynamoDB partition key, which has a per partition write throughput limit of 1,000 WCU/s regardless of provisioned capacity. This causes write throttling that degrades performance. Grouping multiple files into a single SHUFFLE_WRITE task based on (*) The 16 node UPDATE created more SHUFFLE_WRITE tasks than the 8 node run (2,349 vs 2,068), which amplified the DynamoDB throttling impact. Task count varies between runs because it depends on the number of data files affected by Copy on Write. |
Description
This PR adds source-layer shuffle to the iceberg-source plugin for correct and scalable CDC processing of snapshots containing DELETE operations (UPDATE/DELETE in Copy-on-Write tables).
Problems solved
Correctness: When a partition column is updated (e.g.
regionchanges fromUStoEU), Iceberg produces a DELETE in the old partition and an INSERT in the new partition. The current implementation groups tasks by Iceberg partition, so these end up in separate tasks and the UPDATE merge cannot detect the cross-partition update. If the DELETE arrives at the sink after the INSERT, the document is lost. Shuffle routes all records with the sameidentifier_columnsto the same node, enabling correct cross-partition UPDATE detection.Scalability: The previous bounds-based pairing heuristic attempts to match DELETED and ADDED files by column statistics. When pairing fails (e.g. an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables or large partitions where the fallback can include hundreds of files. Shuffle distributes the work evenly across all nodes by hash partitioning, regardless of Iceberg partition structure or file bounds.
Shuffle overview
When a snapshot contains
DeletedDataFileScanTasks, all records in the snapshot are redistributed byidentifier_columnshash so that records sharing the sameidentifier_columnsvalue are guaranteed to land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge, because it has both the DELETE and INSERT for any given document.This redistribution uses a pull-based two-phase approach based on Spark's shuffle architecture (
SortShuffleManager,IndexShuffleBlockResolver). All workers must finish Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes. Between the two phases, the leader collects index metadata from all shuffle files and coalesces adjacent small hash partitions into SHUFFLE_READ tasks (same approach as Spark's Adaptive Query Execution), so the number of Phase 2 tasks is determined by actual data distribution rather than the fixed partition count.Phase 1 (SHUFFLE_WRITE): Each worker reads its assigned data files, computes
hash(identifier_columns) % Pfor each record (P defaults to 64, configurable viashuffle.partitions), and writes records to local disk sorted by hash partition number. Each task produces one data file and one index file.Phase 2 (SHUFFLE_READ): Each worker pulls its assigned hash partition range from all nodes, then performs carryover removal and UPDATE merge on the collected records. Since all records with the same
identifier_columnsvalue are guaranteed to be on the same node, cross-partition updates are correctly detected.Shuffle storage is abstracted behind a
ShuffleStorageinterface. The current implementation uses local disk (LocalDiskShuffleStorage), but the interface allows alternative storage backends (e.g. S3) to be plugged in without changes to the shuffle orchestration or worker logic.The pull-based design decouples writing from network transfer. Phase 1 is purely local I/O with no network dependency, so it is unaffected by slow or failing remote nodes. Phase 2 readers pull data independently and can retry failed transfers without requiring the writer to resend.
Write-side partitioning uses a fixed count (P=64 by default) because the number of nodes may vary and a sufficiently large P ensures even distribution regardless of cluster size. When the actual data volume is small relative to P, many partitions will be empty or contain very little data. After all writes complete, the leader reads the index files to learn the actual size of each partition, then coalesces adjacent small or empty partitions into larger SHUFFLE_READ tasks targeting a configurable size (default 64MB), avoiding unnecessary tasks.
For INSERT-only snapshots (no
DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task.flowchart TD subgraph LeaderScheduler A[Detect snapshot] --> B{DELETED files?} B -->|No| C[INSERT-only: create one<br/>CHANGELOG_TASK per file] B -->|Yes| D[Phase 1: create SHUFFLE_WRITE tasks<br/>1 data file = 1 task] D --> E[Wait for all SHUFFLE_WRITE<br/>to complete] E --> F{shuffle-failed?} F -->|Yes| G[Clean up shuffle files<br/>on all nodes] G --> A F -->|No| H[Collect index files from all nodes<br/>via local disk and HTTP] H --> I[Coalesce:<br/>skip empty partitions<br/>merge small partitions<br/>target 64MB per task] I --> J[Phase 2: create SHUFFLE_READ tasks<br/>1 task = 1 partition range] J --> K[Wait for all SHUFFLE_READ<br/>to complete] K --> K2[Clean up shuffle files<br/>on all nodes] K2 --> L[Update lastProcessedSnapshotId] L --> A C --> M[Wait for all CHANGELOG_TASKs<br/>to complete] M --> L endsequenceDiagram participant LS as LeaderScheduler participant SC as SourceCoordinator participant W1 as Worker Node 1 participant W2 as Worker Node 2 participant H1 as Node 1 HTTP Server participant H2 as Node 2 HTTP Server Note over LS: Snapshot S5 detected (contains DELETED files) rect rgb(230, 245, 255) Note over LS,H2: Phase 1: SHUFFLE_WRITE LS->>SC: createPartition(SHUFFLE_WRITE, file-A) LS->>SC: createPartition(SHUFFLE_WRITE, file-B) LS->>SC: createPartition(SHUFFLE_WRITE, file-C) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-A task W2->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W2: file-B task W1->>W1: Read file-A, hash(id_cols) % 64, sort by partition# W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-A) W2->>W2: Read file-B, hash(id_cols) % 64, sort by partition# W2->>W2: Write data + index files to local disk W2->>SC: Register nodeAddress in GlobalState W2->>SC: completePartition(file-B) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-C task W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-C) end Note over LS: All SHUFFLE_WRITE complete rect rgb(255, 245, 230) Note over LS,H2: Barrier: index collection + coalesce LS->>SC: Read shuffle-locations GlobalState LS->>LS: Read local index files (taskA, taskC) LS->>H2: GET /shuffle/{snapshotId}/{taskId}/index (taskB) H2-->>LS: index offsets LS->>LS: Compute per-partition sizes, coalesce (target 64MB) end rect rgb(230, 255, 230) Note over LS,H2: Phase 2: SHUFFLE_READ LS->>SC: createPartition(SHUFFLE_READ, partitions 0-20) LS->>SC: createPartition(SHUFFLE_READ, partitions 21-63) W1->>SC: acquirePartition(SHUFFLE_READ) SC-->>W1: partitions 0-20 W2->>SC: acquirePartition(SHUFFLE_READ) SC-->>W2: partitions 21-63 W1->>W1: Read partitions 0-20 from local disk (taskA, taskC) W1->>H2: GET /shuffle/.../data (taskB, partitions 0-20) H2-->>W1: compressed blocks W1->>W1: Carryover removal + UPDATE merge W1->>W1: Write to Buffer W1->>SC: completePartition W2->>W2: Read partitions 21-63 from local disk (taskB) W2->>H1: GET /shuffle/.../data (taskA, taskC, partitions 21-63) H1-->>W2: compressed blocks W2->>W2: Carryover removal + UPDATE merge W2->>W2: Write to Buffer W2->>SC: completePartition end Note over LS: All SHUFFLE_READ complete LS->>LS: Delete local shuffle files LS->>H1: DELETE /shuffle/{snapshotId} LS->>H2: DELETE /shuffle/{snapshotId} LS->>SC: Update lastProcessedSnapshotIdA SHUFFLE_READ worker retries HTTP pulls up to 3 times with exponential backoff. If all retries fail, it writes a
shuffle-failedGlobalState entry. LeaderScheduler detects this during its completion polling, cleans up shuffle files, and aborts without updatinglastProcessedSnapshotId. The same snapshot is retried from Phase 1 on the next polling cycle.Shuffle data format
Each SHUFFLE_WRITE task produces one data file and one index file.
Index file:
(numPartitions + 1)long offset values (8 bytes each). Partitioni's data occupies the byte rangeoffset[i]tooffset[i+1]in the data file. Empty partitions haveoffset[i] == offset[i+1].Data file: LZ4-compressed blocks in partition order. Each block contains serialized records.
Records are serialized using Avro binary encoding derived from the Iceberg table schema. The schema is not stored in the shuffle files since both write and read sides derive it from the same table.
Writer buffers all records in memory, sorts by partition number, compresses per partition, and writes in a single pass. Memory usage per SHUFFLE_WRITE task is bounded by one Iceberg data file (default 512MB).
Shuffle orchestration
LeaderScheduler coordinates the phases using barrier synchronization via SourceCoordinator.
IncrementalChangelogScanand checks for DELETED filestarget_partition_size; similar to Spark's Adaptive Query Execution). This avoids creating excessive tasks when most hash partitions are smallDELETE /shuffle/{snapshotId}and cleans up local shuffle files. Each node'scleanupAll()at startup serves as a safety net for any missed cleanupsPartition keys are deterministic (based on file paths and partition ranges) to ensure idempotency if LeaderScheduler crashes and replans the same snapshot.
Node-to-node data transfer
Each Data Prepper node runs an Armeria HTTP server (default port 4995) to serve shuffle data.
GET /shuffle/{snapshotId}/{taskId}/indexGET /shuffle/{snapshotId}/{taskId}/data?offset={offset}&length={length}DELETE /shuffle/{snapshotId}SHUFFLE_READ workers pull data from each SHUFFLE_WRITE task. For same-node tasks, data is read directly from disk without HTTP. For remote tasks, the worker first fetches the index to compute offsets, then fetches each partition's compressed block individually.
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up.
Shuffle key and identifier_columns
identifier_columnshash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE+INSERT with all data columns identical) necessarily have the sameidentifier_columnsvalues, so they are routed to the same node.identifier_columnsis now required when processing snapshots that contain DELETE operations. Without it, OpenSearch document IDs cannot be determined and correct UPDATE/DELETE processing is impossible. The plugin throwsIllegalStateExceptionif DELETED files are detected withoutidentifier_columnsconfigured. For INSERT-only tables,identifier_columnsis not required.Configuration
shuffle.partitionsshuffle.target_partition_sizeshuffle.server_portshuffle.sslshuffle.ssl_certificate_fileshuffle.ssl_key_fileOther changes
TaskGrouper(replaced entirely by shuffle)TaskGroupertoLeaderSchedulerto avoid redundantIncrementalChangelogScancalls for INSERT-only snapshotsIssues Resolved
Resolves #6666
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.